package drds.data_propagate.parse;


import drds.data_propagate.binlog_event.BinLogEvent;
import drds.data_propagate.binlog_event.event.*;
import drds.data_propagate.binlog_event.event.TableMapEvent.ColumnInfo;
import drds.data_propagate.binlog_event.event.mariadb.AnnotateRowsEvent;
import drds.data_propagate.binlog_event.event.rows_event.DeleteRowsEvent;
import drds.data_propagate.binlog_event.event.rows_event.RowsEvent;
import drds.data_propagate.binlog_event.event.rows_event.UpdateRowsEvent;
import drds.data_propagate.binlog_event.event.rows_event.WriteRowsEvent;
import drds.data_propagate.binlog_event.exception.TableIdNotFoundException;
import drds.data_propagate.common.AbstractLifeCycle;
import drds.data_propagate.entry.*;
import drds.data_propagate.entry.position.EntryPosition;
import drds.data_propagate.filter.aviater.AviaterRegexFilter;
import drds.data_propagate.parse.ddl.DdlResult;
import drds.data_propagate.parse.ddl.DruidDdlParser;
import drds.data_propagate.parse.ddl.SimpleDdlParser;
import drds.data_propagate.parse.exception.ParseException;
import drds.data_propagate.parse.table_meta_data.ColumnMetaData;
import drds.data_propagate.parse.table_meta_data.TableMetaData;
import drds.data_propagate.parse.table_meta_data.TableMetaDataCache;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.sql.Types;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;

/**
 * 基于{@linkplain BinLogEvent}转化为Entry对象的处理
 */
public class BinlogEventConvertToEntry extends AbstractLifeCycle implements BinlogParser<BinLogEvent> {

    public static final String XA_XID = "XA_XID";
    public static final String XA_TYPE = "XA_TYPE";
    public static final String XA_START = "XA START";
    public static final String XA_END = "XA END";
    public static final String XA_COMMIT = "XA COMMIT";
    public static final String XA_ROLLBACK = "XA ROLLBACK";
    public static final String ISO_8859_1 = "ISO-8859-1";
    public static final String UTF_8 = "UTF-8";
    public static final int TINYINT_MAX_VALUE = 256;
    public static final int SMALLINT_MAX_VALUE = 65536;
    public static final int MEDIUMINT_MAX_VALUE = 16777216;
    public static final long INTEGER_MAX_VALUE = 4294967296L;
    public static final BigInteger BIGINT_MAX_VALUE = new BigInteger("18446744073709551616");
    public static final int version = 1;
    public static final String BEGIN = "BEGIN";
    public static final String COMMIT = "COMMIT";
    public static final Logger logger = LoggerFactory.getLogger(BinlogEventConvertToEntry.class);
    @Setter
    @Getter
    private volatile AviaterRegexFilter nameFilter; // 运行时引用可能会有变化，比如规则发生变化时

    @Setter
    @Getter
    private volatile AviaterRegexFilter nameBlackFilter;
    @Setter
    @Getter
    private TableMetaDataCache tableMetaDataCache;
    @Setter
    @Getter
    private Charset charset = Charset.defaultCharset();
    @Setter
    @Getter
    private boolean filterQueryDcl = false;
    @Setter
    @Getter
    private boolean filterQueryDml = false;
    @Setter
    @Getter
    private boolean filterQueryDdl = false;

    // 是否跳过table相关的解析异常,比如表不存在或者列数量不匹配,issue 92
    @Setter
    @Getter
    private boolean filterTableError = false;
    @Setter
    @Getter
    // 新增rows过滤，用于仅订阅除rows以外的数据
    private boolean filterRows = false;
    @Setter
    @Getter
    private boolean useDruidDdlFilter = true;

    public BinlogEventConvertToEntry() {

    }

    public static TransactionBegin createTransactionBegin(long threadId) {
        TransactionBegin transactionBegin = new TransactionBegin();
        transactionBegin.setThreadId(threadId);
        return transactionBegin;
    }

    public static TransactionEnd createTransactionEnd(long transactionId) {
        TransactionEnd transactionEnd = new TransactionEnd();
        transactionEnd.setTransactionId(transactionId);
        return transactionEnd;
    }

    public static KeyValuePair createKeyValuePair(String key, String value) {
        KeyValuePair keyValuePair = new KeyValuePair();
        keyValuePair.setKey(key);
        keyValuePair.setValue(value);
        return keyValuePair;
    }

    public static Entry createEntry(EntryHeader entryHeader, EntryType entryType,
                                    Object storeValue) {
        Entry entry = new Entry();
        entry.setEntryHeader(entryHeader);
        entry.setEntryType(entryType);
        entry.setValue(storeValue);
        return entry;
    }

    /**
     * 最核心的代码
     */
    @Override
    public Entry parse(BinLogEvent binLogEvent, boolean isSeek) throws ParseException {
        if (binLogEvent == null || binLogEvent instanceof UnknownEvent) {
            return null;
        }

        int eventType = binLogEvent.getHeader().getEventType();
        switch (eventType) {
            case BinLogEvent.query_event:
                return parseQueryEvent((QueryEvent) binLogEvent, isSeek);
            case BinLogEvent.xid_event:
                return parseXidEvent((XidEvent) binLogEvent);
            case BinLogEvent.table_map_event:
                break;
            case BinLogEvent.write_rows_event_v1:
            case BinLogEvent.write_rows_event:
                return parseRowsEvent((WriteRowsEvent) binLogEvent);
            case BinLogEvent.update_rows_event_v1:
            case BinLogEvent.partial_update_rows_event:
            case BinLogEvent.update_rows_event:
                return parseRowsEvent((UpdateRowsEvent) binLogEvent);
            case BinLogEvent.delete_rows_event_v1:
            case BinLogEvent.delete_rows_event:
                return parseRowsEvent((DeleteRowsEvent) binLogEvent);
            case BinLogEvent.rows_query_log_event:
                return parseRowsQueryEvent((RowsQueryEvent) binLogEvent);
            case BinLogEvent.annotate_rows_event:
                return parseAnnotateRowsEvent((AnnotateRowsEvent) binLogEvent);
            case BinLogEvent.user_var_event:
                return parseUserVarLogEvent((UserVarEvent) binLogEvent);
            case BinLogEvent.intvar_event:
                return parseIntrvarLogEvent((IntvarEvent) binLogEvent);
            case BinLogEvent.rand_event:
                return parseRandLogEvent((RandEvent) binLogEvent);
            case BinLogEvent.gtid_log_event:
                return parseGTIDLogEvent((GtidEvent) binLogEvent);
            case BinLogEvent.heartbeat_log_event:
                return parseHeartbeatLogEvent((HeartbeatEvent) binLogEvent);
            default:
                break;
        }

        return null;
    }

    public void reset() {
        // do nothing
        if (tableMetaDataCache != null) {
            tableMetaDataCache.clearTableMetaData();
        }
    }

    private Entry parseHeartbeatLogEvent(HeartbeatEvent heartbeatEvent) {
        EntryHeader entryHeader = new EntryHeader();
        entryHeader.setEventType(EventType.mheartbeat);
        Entry entry = new Entry();
        entry.setEntryHeader(entryHeader);
        entry.setEntryType(EntryType.heartbeat);
        return entry;
    }

    private Entry parseGTIDLogEvent(GtidEvent gtidEvent) {
        Header header = gtidEvent.getHeader();
        KeyValuePair keyValuePair = new KeyValuePair();
        keyValuePair.setKey("gtid");
        keyValuePair.setValue(gtidEvent.getGtidString());

        if (gtidEvent.getLastCommitted() != null) {
            keyValuePair.setKey("lastCommitted");
            keyValuePair.setValue(String.valueOf(gtidEvent.getLastCommitted()));
            keyValuePair.setKey("sequenceNumber");
            keyValuePair.setValue(String.valueOf(gtidEvent.getSequenceNumber()));
        }

        EntryHeader entryHeader = createHeader(header, "", "", EventType.gtid);
        return createEntry(entryHeader, EntryType.gtid_log, keyValuePair);
    }

    private Entry parseQueryEvent(QueryEvent queryEvent, boolean isSeek) {
        String queryString = queryEvent.getQueryString();
        if (StringUtils.startsWithIgnoreCase(queryString, XA_START)) {
            // xa start use TransactionBegin
            TransactionBegin transactionBegin = new TransactionBegin();
            transactionBegin.setThreadId(queryEvent.getSessionId());
            transactionBegin.getProps().add(createKeyValuePair(XA_TYPE, XA_START));
            transactionBegin.getProps().add(createKeyValuePair(XA_XID, getXaXid(queryString, XA_START)));

            EntryHeader entryHeader = createHeader(queryEvent.getHeader(), "", "", null);
            return createEntry(entryHeader, EntryType.transaction_begin, transactionBegin);
        } else if (StringUtils.startsWithIgnoreCase(queryString, XA_END)) {
            // xa start use TransactionEnd
            TransactionEnd transactionEnd = new TransactionEnd();
            transactionEnd.setTransactionId(String.valueOf(0L));
            transactionEnd.getProps().add(createKeyValuePair(XA_TYPE, XA_END));
            transactionEnd.getProps().add(createKeyValuePair(XA_XID, getXaXid(queryString, XA_END)));

            EntryHeader entryHeader = createHeader(queryEvent.getHeader(), "", "", null);
            return createEntry(entryHeader, EntryType.transaction_end, transactionEnd);
        } else if (StringUtils.startsWithIgnoreCase(queryString, XA_COMMIT)) {
            // xa commit
            EntryHeader entryHeader = createHeader(queryEvent.getHeader(), "", "",
                    EventType.xa_commit);
            RowChange rowChange = new RowChange();
            rowChange.setSql(queryString);
            rowChange.getPropsList().add(createKeyValuePair(XA_TYPE, XA_COMMIT));
            rowChange.getPropsList().add((createKeyValuePair(XA_XID, getXaXid(queryString, XA_COMMIT))));
            rowChange.setEventType(EventType.xa_commit);
            return createEntry(entryHeader, EntryType.row_data, rowChange);
        } else if (StringUtils.startsWithIgnoreCase(queryString, XA_ROLLBACK)) {
            // xa rollback
            EntryHeader entryHeader = createHeader(queryEvent.getHeader(), "", "",
                    EventType.xa_rollback);
            RowChange rowChange = new RowChange();
            rowChange.setSql(queryString);
            rowChange.getPropsList().add(createKeyValuePair(XA_TYPE, XA_ROLLBACK));
            rowChange.getPropsList().add(createKeyValuePair(XA_XID, getXaXid(queryString, XA_ROLLBACK)));
            rowChange.setEventType(EventType.xa_rollback);
            return createEntry(entryHeader, EntryType.row_data, rowChange);
            //
        } else if (StringUtils.endsWithIgnoreCase(queryString, BEGIN)) {
            TransactionBegin transactionBegin = createTransactionBegin(queryEvent.getSessionId());
            EntryHeader entryHeader = createHeader(queryEvent.getHeader(), "", "", null);
            return createEntry(entryHeader, EntryType.transaction_begin, transactionBegin);
        } else if (StringUtils.endsWithIgnoreCase(queryString, COMMIT)) {
            TransactionEnd transactionEnd = createTransactionEnd(0L); // MyISAM可能不会有xid事件
            EntryHeader entryHeader = createHeader(queryEvent.getHeader(), "", "", null);
            return createEntry(entryHeader, EntryType.transaction_end, transactionEnd);
            //
        } else {
            boolean notFilter = false;
            EventType eventType = EventType.query;
            String tableName = null;
            String schemaName = null;
            if (useDruidDdlFilter) {
                List<DdlResult> ddlResultList = DruidDdlParser.parse(queryString, queryEvent.getSchemaName());
                for (DdlResult ddlResult : ddlResultList) {
                    if (!processFilter(queryString, ddlResult)) {
                        // 只要有一个数据不进行过滤
                        notFilter = true;
                    }
                }
                if (ddlResultList.size() > 0) {
                    // 如果针对多行的DDL,只能取第一条
                    eventType = ddlResultList.get(0).getEventType();
                    schemaName = ddlResultList.get(0).getSchemaName();
                    tableName = ddlResultList.get(0).getTableName();
                }
            } else {
                DdlResult ddlResult = SimpleDdlParser.parse(queryString, queryEvent.getSchemaName());
                if (!processFilter(queryString, ddlResult)) {
                    notFilter = true;
                }

                eventType = ddlResult.getEventType();
                schemaName = ddlResult.getSchemaName();
                tableName = ddlResult.getTableName();
            }

            if (!notFilter) {
                // 如果是过滤的数据就不处理了
                return null;
            }

            boolean isDml = (eventType == EventType.insert || eventType == EventType.update || eventType == EventType.delete);
            //ddl修改
            if (!isSeek && !isDml) {
                // 使用新的表结构元数据管理方式
                EntryPosition entryPosition = createPosition(queryEvent.getHeader());
                tableMetaDataCache.apply(entryPosition, queryEvent.getSchemaName(), queryString, null);
            }

            EntryHeader entryHeader = createHeader(queryEvent.getHeader(), schemaName, tableName, eventType);
            RowChange rowChange = new RowChange();
            if (eventType != EventType.query && !isDml) {
                rowChange.setDdl(true);
            }
            rowChange.setSql(queryString);
            if (StringUtils.isNotEmpty(queryEvent.getSchemaName())) {// 可能为空
                rowChange.setDdlSchemaName(queryEvent.getSchemaName());
            }
            rowChange.setEventType(eventType);
            return createEntry(entryHeader, EntryType.row_data, rowChange);
        }
    }

    private String getXaXid(String queryString, String type) {
        return StringUtils.substringAfter(queryString, type);
    }

    private boolean processFilter(String queryString, DdlResult result) {
        String schemaName = result.getSchemaName();
        String tableName = result.getTableName();
        // fixed issue https://github.com/alibaba/canal/issues/58
        // 更新下table meta cache
        if (tableMetaDataCache != null && (result.getEventType() == EventType.alter
                || result.getEventType() == EventType.erase || result.getEventType() == EventType.rename)) {
            // 对外返回，保证兼容，还是返回QUERY类型，这里暂不解析tableName，所以无法支持过滤
            for (DdlResult renameResult = result; renameResult != null; renameResult = renameResult
                    .getRenameTableResult()) {
                String schemaName0 = renameResult.getSchemaName();
                String tableName0 = renameResult.getTableName();
                if (StringUtils.isNotEmpty(tableName0)) {
                    // 如果解析到了正确的表信息，则根据全名进行清除
                    tableMetaDataCache.clearTableMetaData(schemaName0, tableName0);
                } else {
                    // 如果无法解析正确的表信息，则根据schema进行清除
                    tableMetaDataCache.clearTableMetaWithSchemaName(schemaName0);
                }
            }
        }

        // fixed issue https://github.com/alibaba/canal/issues/58
        if (result.getEventType() == EventType.alter || result.getEventType() == EventType.erase
                || result.getEventType() == EventType.create || result.getEventType() == EventType.truncate
                || result.getEventType() == EventType.rename || result.getEventType() == EventType.create_index
                || result.getEventType() == EventType.delete_index) { // 针对DDL类型

            if (filterQueryDdl) {
                return true;
            }

            if (StringUtils.isEmpty(tableName)
                    || (result.getEventType() == EventType.rename && StringUtils.isEmpty(result.getOriTableName()))) {
                // 如果解析不出tableName,记录一下日志，方便bugfix，目前直接抛出异常，中断解析
                throw new ParseException(
                        "SimpleDdlParser process queryString failed. pls submit issue with this queryString: " + queryString
                                + " , and DdlResult: " + result.toString());
                // return null;
            } else {
                // check name eventFilter
                String name = schemaName + "." + tableName;
                if (nameFilter != null && !nameFilter.filter(name)) {
                    if (result.getEventType() == EventType.rename) {
                        // rename校验只要源和目标满足一个就进行操作
                        if (nameFilter != null
                                && !nameFilter.filter(result.getOriSchemaName() + "." + result.getOriTableName())) {
                            return true;
                        }
                    } else {
                        // 其他情况返回null
                        return true;
                    }
                }

                if (nameBlackFilter != null && nameBlackFilter.filter(name)) {
                    if (result.getEventType() == EventType.rename) {
                        // rename校验只要源和目标满足一个就进行操作
                        if (nameBlackFilter != null
                                && nameBlackFilter.filter(result.getOriSchemaName() + "." + result.getOriTableName())) {
                            return true;
                        }
                    } else {
                        // 其他情况返回null
                        return true;
                    }
                }
            }
        } else if (result.getEventType() == EventType.insert || result.getEventType() == EventType.update
                || result.getEventType() == EventType.delete) {
            // 对外返回，保证兼容，还是返回QUERY类型，这里暂不解析tableName，所以无法支持过滤
            if (filterQueryDml) {
                return true;
            }
        } else if (filterQueryDcl) {
            return true;
        }

        return false;
    }

    private Entry parseRowsQueryEvent(RowsQueryEvent rowsQueryEvent) {
        if (filterQueryDml) {
            return null;
        }
        // mysql5.6支持，需要设置binlog-rows-queryString-log-events=1，可详细打印原始DML语句
        String queryString = null;
        try {
            queryString = new String(rowsQueryEvent.getRowsQuery().getBytes(ISO_8859_1), charset.name());
            String tableName = null;
            if (useDruidDdlFilter) {
                List<DdlResult> results = DruidDdlParser.parse(queryString, null);
                if (results.size() > 0) {
                    tableName = results.get(0).getTableName();
                }
            }

            return buildQueryEntry(queryString, rowsQueryEvent.getHeader(), tableName);
        } catch (UnsupportedEncodingException e) {
            throw new ParseException(e);
        }
    }

    private Entry parseAnnotateRowsEvent(AnnotateRowsEvent annotateRowsEvent) {
        if (filterQueryDml) {
            return null;
        }
        // mariaDb支持，需要设置binlog_annotate_row_events=true，可详细打印原始DML语句
        String queryString = null;
        try {
            queryString = new String(annotateRowsEvent.getRowsQuery().getBytes(ISO_8859_1), charset.name());
            return buildQueryEntry(annotateRowsEvent.getHeader(), queryString);
        } catch (UnsupportedEncodingException e) {
            throw new ParseException(e);
        }
    }

    private Entry parseUserVarLogEvent(UserVarEvent userVarEvent) {
        if (filterQueryDml) {
            return null;
        }

        return buildQueryEntry(userVarEvent.getHeader(), userVarEvent.getQuery());
    }

    private Entry parseIntrvarLogEvent(IntvarEvent intvarEvent) {
        if (filterQueryDml) {
            return null;
        }

        return buildQueryEntry(intvarEvent.getHeader(), intvarEvent.getQuery());
    }

    private Entry parseRandLogEvent(RandEvent randEvent) {
        if (filterQueryDml) {
            return null;
        }

        return buildQueryEntry(randEvent.getHeader(), randEvent.getQuery());
    }

    private Entry parseXidEvent(XidEvent xidEvent) {
        TransactionEnd transactionEnd = createTransactionEnd(xidEvent.getXid());
        EntryHeader entryHeader = createHeader(xidEvent.getHeader(), "", "", null);
        return createEntry(entryHeader, EntryType.transaction_end, transactionEnd);
    }

    public TableMetaData parseRowsEventForTableMetaData(RowsEvent rowsEvent) {
        TableMapEvent tableMapEvent = rowsEvent.getTableMapEvent();
        if (tableMapEvent == null) {
            // tableId对应的记录不存在
            throw new TableIdNotFoundException("not found tableId:" + rowsEvent.getTableId());
        }

        boolean isHeartBeat = isAliSQLHeartBeat(tableMapEvent.getSchemaName(), tableMapEvent.getTableName());
        boolean isRDSHeartBeat = tableMetaDataCache.isOnRDS() && isRDSHeartBeat(tableMapEvent.getSchemaName(), tableMapEvent.getTableName());

        String schemaNameAndTableName = tableMapEvent.getSchemaName() + "." + tableMapEvent.getTableName();
        // check name eventFilter
        if (nameFilter != null && !nameFilter.filter(schemaNameAndTableName)) {
            return null;
        }
        if (nameBlackFilter != null && nameBlackFilter.filter(schemaNameAndTableName)) {
            return null;
        }

        // if (isHeartBeat || isRDSHeartBeat) {
        // // 忽略rds模式的mysql.ha_health_check心跳数据
        // return null;
        // }
        TableMetaData tableMetaData = null;
        if (isRDSHeartBeat) {
            // 处理rds模式的mysql.ha_health_check心跳数据
            // 主要RDS的心跳表基本无权限,需要mock一个tableMeta
            ColumnMetaData id = new ColumnMetaData("id", "bigint(20)", true, false, "0");
            ColumnMetaData type = new ColumnMetaData("eventType", "char(1)", false, true, "0");
            tableMetaData = new TableMetaData(tableMapEvent.getSchemaName(), tableMapEvent.getTableName(), Arrays.asList(id, type));
        } else if (isHeartBeat) {
            // 处理alisql模式的test.heartbeat心跳数据
            // 心跳表基本无权限,需要mock一个tableMeta
            ColumnMetaData idMeta = new ColumnMetaData("id", "smallint(6)", false, true, null);
            ColumnMetaData typeMeta = new ColumnMetaData("ts", "int(11)", true, false, null);
            tableMetaData = new TableMetaData(tableMapEvent.getSchemaName(), tableMapEvent.getTableName(), Arrays.asList(idMeta, typeMeta));
        }

        EntryPosition entryPosition = createPosition(rowsEvent.getHeader());
        if (tableMetaDataCache != null && tableMetaData == null) {// 入错存在table meta
            tableMetaData = getTableMetaData(tableMapEvent.getSchemaName(), tableMapEvent.getTableName(), true, entryPosition);
            if (tableMetaData == null) {
                if (!filterTableError) {
                    throw new ParseException("not found [" + schemaNameAndTableName + "] in dataBaseName , pls check!");
                }
            }
        }

        return tableMetaData;
    }

    public Entry parseRowsEvent(RowsEvent rowsEvent) {
        return parseRowsEvent(null, rowsEvent);
    }

    public Entry parseRowsEvent(TableMetaData tableMetaData, RowsEvent rowsEvent) {
        if (filterRows) {
            return null;
        }
        try {
            if (tableMetaData == null) { // 如果没有外部指定
                tableMetaData = parseRowsEventForTableMetaData(rowsEvent);
            }

            if (tableMetaData == null) {
                // 拿不到表结构,执行忽略
                return null;
            }

            EventType eventType = null;
            int eventType1 = rowsEvent.getHeader().getEventType();
            if (BinLogEvent.write_rows_event_v1 == eventType1 || BinLogEvent.write_rows_event == eventType1) {
                eventType = EventType.insert;
            } else if (BinLogEvent.update_rows_event_v1 == eventType1 || BinLogEvent.update_rows_event == eventType1 || BinLogEvent.partial_update_rows_event == eventType1) {
                eventType = EventType.update;
            } else if (BinLogEvent.delete_rows_event_v1 == eventType1 || BinLogEvent.delete_rows_event == eventType1) {
                eventType = EventType.delete;
            } else {
                throw new ParseException("unsupport event eventType :" + rowsEvent.getHeader().getEventType());
            }

            RowChange rowchange1 = new RowChange();
            rowchange1.setTableId(rowsEvent.getTableId());
            rowchange1.setDdl(false);

            rowchange1.setEventType(eventType);
            RowsEventBuffer rowsEventBuffer = rowsEvent.getRowsEventBuffer(charset.name());
            BitSet columns = rowsEvent.getColumnBitSet();
            BitSet changeColumns = rowsEvent.getChangeColumnBitSet();

            boolean tableError = false;
            int rowsCount = 0;
            while (rowsEventBuffer.nextOneRow(columns, false)) {
                // 处理row记录
                RowData rowData = new RowData();
                if (EventType.insert == eventType) {
                    // insert的记录放在before字段中
                    tableError |= parseOneRow(rowData, rowsEvent, rowsEventBuffer, columns, true, tableMetaData);
                } else if (EventType.delete == eventType) {
                    // delete的记录放在before字段中
                    tableError |= parseOneRow(rowData, rowsEvent, rowsEventBuffer, columns, false, tableMetaData);
                } else {
                    // update需要处理before/after
                    tableError |= parseOneRow(rowData, rowsEvent, rowsEventBuffer, columns, false, tableMetaData);
                    if (!rowsEventBuffer.nextOneRow(changeColumns, true)) {
                        rowchange1.getRowDataList().add(rowData);
                        break;
                    }

                    tableError |= parseOneRow(rowData, rowsEvent, rowsEventBuffer, changeColumns, true, tableMetaData);
                }

                rowsCount++;
                rowchange1.getRowDataList().add(rowData);
            }
            TableMapEvent tableMapEvent = rowsEvent.getTableMapEvent();
            EntryHeader entryHeader = createHeader(rowsEvent.getHeader(), tableMapEvent.getSchemaName(), tableMapEvent.getTableName(), eventType, rowsCount);

            RowChange rowChange = rowchange1;
            if (tableError) {
                Entry entry = createEntry(entryHeader, EntryType.row_data, null);
                logger.warn("tableName parser error : {}value: {}", entry.toString(), rowChange.toString());
                return null;
            } else {
                Entry entry = createEntry(entryHeader, EntryType.row_data, rowChange);
                return entry;
            }
        } catch (Exception e) {
            throw new ParseException("parse row data failed.", e);
        }
    }

    private EntryPosition createPosition(Header header) {
        return new EntryPosition(header.getLogFileName(),//
                header.getNextBinLogEventPosition() - header.getEventLength(), // startPos
                header.getExecuteTimeStamp() * 1000L, header.getServerId()); // 记录到秒
    }

    private boolean parseOneRow(//
                                RowData rowData,//
                                RowsEvent rowsEvent,//
                                RowsEventBuffer rowsEventBuffer, //
                                BitSet columnNullBitSet,//
                                boolean isAfter,//
                                TableMetaData tableMetaData) throws UnsupportedEncodingException {
        int columnCount = rowsEvent.getTableMapEvent().getColumnCount();
        ColumnInfo[] columnInfos = rowsEvent.getTableMapEvent().getColumnInfos();
        // mysql8.0针对set @@global.binlog_row_metadata='FULL' 可以记录部分的metadata信息
        boolean existOptionalMetaData = rowsEvent.getTableMapEvent().isExistOptionalMetaData();
        boolean tableError = false;
        // check tableName fileds count，只能处理加字段
        boolean existRDSNoPrimaryKey = false;
        if (tableMetaData != null && columnInfos.length > tableMetaData.getColumnMetaDataList().size()) {
            //获取主键
            if (tableMetaDataCache.isOnRDS()) {
                // 特殊处理下RDS的场景
                List<ColumnMetaData> primaryKeyColumnMetaDataList = tableMetaData.getPrimaryKeyColumnMetaDataList();
                if (primaryKeyColumnMetaDataList == null || primaryKeyColumnMetaDataList.isEmpty()) {
                    if (columnInfos.length == tableMetaData.getColumnMetaDataList().size() + 1
                            && columnInfos[columnInfos.length - 1].type == BinLogEvent.mysql_type_longlong) {
                        existRDSNoPrimaryKey = true;
                    }
                }
            }

            EntryPosition entryPosition = createPosition(rowsEvent.getHeader());
            if (!existRDSNoPrimaryKey) {
                // online ddl增加字段操作步骤：
                // 1. 新增一张临时表，将需要做ddl表的数据全量导入
                // 2. 在老表上建立I/U/D的trigger，增量的将数据插入到临时表
                // 3. 锁住应用请求，将临时表rename为老表的名字，完成增加字段的操作
                // 尝试做一次reload，可能因为ddl没有正确解析，或者使用了类似online ddl的操作
                // 因为online ddl没有对应表名的alter语法，所以不会有clear cache的操作
                tableMetaData = getTableMetaData(rowsEvent.getTableMapEvent().getSchemaName(), rowsEvent.getTableMapEvent().getTableName(), false, entryPosition);// 强制重新获取一次
                if (tableMetaData == null) {
                    tableError = true;
                    if (!filterTableError) {
                        throw new ParseException("not found [" + rowsEvent.getTableMapEvent().getSchemaName() + "."
                                + rowsEvent.getTableMapEvent().getTableName() + "] in dataBaseName , pls check!");
                    }
                }

                // 在做一次判断
                if (tableMetaData != null && columnInfos.length > tableMetaData.getColumnMetaDataList().size()) {
                    tableError = true;
                    if (!filterTableError) {
                        throw new ParseException("column size is not match for tableName:" + tableMetaData.getFullName()
                                + "," + columnInfos.length + " vs " + tableMetaData.getColumnMetaDataList().size());
                    }
                }
                // } else {
                // log.warn("[" + event.getTableName().getDbName() + "." +
                // event.getTableName().getTableName()
                // + "] is no primary primaryKey , skip alibaba_rds_row_id column");
            }
        }

        for (int index = 0; index < columnCount; index++) {
            ColumnInfo columnInfo = columnInfos[index];
            // mysql 5.6开始支持nolob/mininal类型,并不一定记录所有的列,需要进行判断
            if (!columnNullBitSet.get(index)) {
                continue;
            }

            if (existRDSNoPrimaryKey && index == columnCount - 1 && columnInfo.type == BinLogEvent.mysql_type_longlong) {
                // 不解析最后一列
                String rdsRowIdColumnName = "#alibaba_rds_row_id#";
                rowsEventBuffer.nextValue(rdsRowIdColumnName, index, columnInfo.type, columnInfo.meta, false);
                Column column = new Column();
                column.setName(rdsRowIdColumnName);
                column.setKey(true);
                column.setMysqlType("bigint");
                column.setIndex(index);
                column.setNull(false);
                Serializable value = rowsEventBuffer.getValue();
                column.setValue(value.toString());
                column.setSqlType(Types.BIGINT);
                column.setUpdated(false);

                if (isAfter) {
                    rowData.getAfterColumnList().add(column);
                } else {
                    rowData.getBeforeColumnList().add(column);
                }
                continue;
            }

            ColumnMetaData columnMetaData = null;
            if (tableMetaData != null && !tableError) {
                // 处理file meta
                columnMetaData = tableMetaData.getColumnMetaDataList().get(index);
            }

            if (columnMetaData != null && existOptionalMetaData && tableMetaDataCache.isOnTSDB()) {
                // check column info(保证三样属性一致)
                boolean check = StringUtils.equalsIgnoreCase(columnMetaData.getColumnName(), columnInfo.name);
                check &= (columnMetaData.isUnsigned() == columnInfo.unsigned);
                check &= (columnMetaData.isNullable() == columnInfo.nullable);
                if (!check) {
                    throw new ParseException("MySQL8.0 unmatch column metadata & pls submit issue , tableName : "
                            + tableMetaData.getFullName() + ", dataBaseName columnMetaData : " + columnMetaData.toString()
                            + " , binlog_event columnMetaData : " + columnInfo.toString() + " , on : "
                            + rowsEvent.getHeader().getLogFileName() + ":"
                            + (rowsEvent.getHeader().getNextBinLogEventPosition() - rowsEvent.getHeader().getEventLength()));
                }
            }

            Column column = new Column();
            if (columnMetaData != null) {
                column.setName(columnMetaData.getColumnName());
                column.setKey(columnMetaData.isPrimaryKey());
                // 增加mysql type类型,issue 73
                column.setMysqlType(columnMetaData.getColumnType());
            } else if (existOptionalMetaData) {
                column.setName(columnInfo.name);
                column.setKey(columnInfo.pk);
                // mysql8.0里没有mysql type类型
                // columnBuilder.setMysqlType(columnMetaData.getColumnType());
            }
            column.setIndex(index);
            column.setNull(false);

            // fixed issue
            // https://github.com/alibaba/canal/issues/66，特殊处理binary/varbinary，不能做编码处理
            boolean isBinary = false;
            boolean isSingleBit = false;
            if (columnMetaData != null) {
                if (StringUtils.containsIgnoreCase(columnMetaData.getColumnType(), "VARBINARY")) {
                    isBinary = true;
                } else if (StringUtils.containsIgnoreCase(columnMetaData.getColumnType(), "BINARY")) {
                    isBinary = true;
                } else if (StringUtils.containsIgnoreCase(columnMetaData.getColumnType(), "TINYINT(1)")) {
                    isSingleBit = true;
                }
            }

            rowsEventBuffer.nextValue(column.getName(), index, columnInfo.type, columnInfo.meta, isBinary);
            int javaType = rowsEventBuffer.getJavaType();
            if (rowsEventBuffer.isFNull()) {
                column.setNull(true);
            } else {
                final Serializable value = rowsEventBuffer.getValue();
                // 处理各种类型
                switch (javaType) {
                    case Types.INTEGER:
                    case Types.TINYINT:
                    case Types.SMALLINT:
                    case Types.BIGINT:
                        // 处理unsigned类型
                        Number number = (Number) value;
                        boolean isUnsigned = (columnMetaData != null ? columnMetaData.isUnsigned()
                                : (existOptionalMetaData ? columnInfo.unsigned : false));
                        if (isUnsigned && number.longValue() < 0) {
                            switch (rowsEventBuffer.getLength()) {
                                case 1: /* MYSQL_TYPE_TINY */
                                    column.setValue(String.valueOf(Integer.valueOf(TINYINT_MAX_VALUE + number.intValue())));
                                    javaType = Types.SMALLINT; // 往上加一个量级
                                    break;

                                case 2: /* MYSQL_TYPE_SHORT */
                                    column.setValue(String.valueOf(Integer.valueOf(SMALLINT_MAX_VALUE + number.intValue())));
                                    javaType = Types.INTEGER; // 往上加一个量级
                                    break;

                                case 3: /* MYSQL_TYPE_INT24 */
                                    column.setValue(String.valueOf(Integer.valueOf(MEDIUMINT_MAX_VALUE + number.intValue())));
                                    javaType = Types.INTEGER; // 往上加一个量级
                                    break;

                                case 4: /* MYSQL_TYPE_LONG */
                                    column.setValue(String.valueOf(Long.valueOf(INTEGER_MAX_VALUE + number.longValue())));
                                    javaType = Types.BIGINT; // 往上加一个量级
                                    break;

                                case 8: /* MYSQL_TYPE_LONGLONG */
                                    column.setValue(BIGINT_MAX_VALUE.add(BigInteger.valueOf(number.longValue())).toString());
                                    javaType = Types.DECIMAL; // 往上加一个量级，避免执行出错
                                    break;
                            }
                        } else {
                            // 对象为number类型，直接valueof即可
                            column.setValue(String.valueOf(value));
                        }

                        if (isSingleBit && javaType == Types.TINYINT) {
                            javaType = Types.BIT;
                        }
                        break;
                    case Types.REAL: // float
                    case Types.DOUBLE: // double
                        // 对象为number类型，直接valueof即可
                        column.setValue(String.valueOf(value));
                        break;
                    case Types.BIT:// bit
                        // 对象为number类型
                        column.setValue(String.valueOf(value));
                        break;
                    case Types.DECIMAL:
                        column.setValue(((BigDecimal) value).toPlainString());
                        break;
                    case Types.TIMESTAMP:
                        // 修复时间边界值
                        // String v = value.toString();
                        // v = v.substring(0, v.length() - 2);
                        // columnBuilder.setValue(v);
                        // break;
                    case Types.TIME:
                    case Types.DATE:
                        // 需要处理year
                        column.setValue(value.toString());
                        break;
                    case Types.BINARY:
                    case Types.VARBINARY:
                    case Types.LONGVARBINARY:
                        // fixed text encoding
                        // https://github.com/AlibabaTech/canal/issues/18
                        // mysql binlog中blob/text都处理为blob类型，需要反查table
                        // meta，按编码解析text
                        if (columnMetaData != null && isText(columnMetaData.getColumnType())) {
                            column.setValue(new String((byte[]) value, charset));
                            javaType = Types.CLOB;
                        } else {
                            // byte数组，直接使用iso-8859-1保留对应编码，浪费内存
                            column.setValue(new String((byte[]) value, ISO_8859_1));
                            // columnBuilder.setValueBytes(ByteString.copyFrom((byte[])
                            // value));
                            javaType = Types.BLOB;
                        }
                        break;
                    case Types.CHAR:
                    case Types.VARCHAR:
                        column.setValue(value.toString());
                        break;
                    default:
                        column.setValue(value.toString());
                }
            }

            column.setSqlType(javaType);
            // 设置是否update的标记位
            column.setUpdated(isAfter && isUpdate(rowData.getBeforeColumnList(), index, column.isNull() ? null : (String) column.getValue()));
            if (isAfter) {
                rowData.getAfterColumnList().add(column);
            } else {
                rowData.getBeforeColumnList().add(column);
            }
        }

        return tableError;

    }

    private Entry buildQueryEntry(String queryString, Header logHeader, String tableName) {
        EntryHeader entryHeader = createHeader(logHeader, "", tableName, EventType.query);
        RowChange rowChange = new RowChange();
        rowChange.setSql(queryString);
        rowChange.setEventType(EventType.query);
        return createEntry(entryHeader, EntryType.row_data, rowChange);
    }

    private Entry buildQueryEntry(Header logHeader, String queryString) {
        EntryHeader entryHeader = createHeader(logHeader, "", "", EventType.query);
        RowChange rowChange = new RowChange();
        rowChange.setSql(queryString);
        rowChange.setEventType(EventType.query);
        return createEntry(entryHeader, EntryType.row_data, rowChange);
    }

    private EntryHeader createHeader(Header logHeader, String schemaName,
                                     String tableName, EventType eventType) {
        return createHeader(logHeader, schemaName, tableName, eventType, -1);
    }

    private EntryHeader createHeader(Header logHeader, String schemaName,
                                     String tableName, EventType eventType, Integer rowsCount) {
        // header会做信息冗余,方便以后做检索或者过滤
        EntryHeader entryHeader = new EntryHeader();
        entryHeader.setVersion(version);
        entryHeader.setLogFileName(logHeader.getLogFileName());
        // 记录的是该binlog的start offest
        entryHeader.setLogfileOffset(logHeader.getNextBinLogEventPosition() - logHeader.getEventLength());
        entryHeader.setServerId(logHeader.getServerId());
        entryHeader.setServerenCode(UTF_8);// 经过java输出后所有的编码为unicode
        entryHeader.setExecuteTime(logHeader.getExecuteTimeStamp() * 1000L);

        if (eventType != null) {
            entryHeader.setEventType(eventType);
        }
        if (schemaName != null) {
            entryHeader.setSchemaName(schemaName);
        }
        if (tableName != null) {
            entryHeader.setTableName(tableName);
        }
        entryHeader.setEventLength(logHeader.getEventLength());
        // enable gtid readedIndex
        if (StringUtils.isNotEmpty(logHeader.getGtidSetString())) {
            entryHeader.setGtid(logHeader.getGtidSetString());
        }
        // add current gtid
        if (StringUtils.isNotEmpty(logHeader.getCurrentGtidString())) {
            KeyValuePair keyValuePair = createKeyValuePair("curtGtid", logHeader.getCurrentGtidString());
            entryHeader.getProps().add(keyValuePair);
        }
        // add current gtid sequence no
        if (StringUtils.isNotEmpty(logHeader.getCurrentGtidSequenceNumber())) {
            KeyValuePair keyValuePair = createKeyValuePair("curtGtidSn", logHeader.getCurrentGtidSequenceNumber());
            entryHeader.getProps().add(keyValuePair);
        }

        // add current gtid last committed
        if (StringUtils.isNotEmpty(logHeader.getCurrentGtidLastCommitted())) {
            KeyValuePair keyValuePair = createKeyValuePair("curtGtidLct", logHeader.getCurrentGtidLastCommitted());
            entryHeader.getProps().add(keyValuePair);
        }

        // add rowsCount suppport
        if (rowsCount > 0) {
            KeyValuePair keyValuePair = createKeyValuePair("rowsCount", String.valueOf(rowsCount));
            entryHeader.getProps().add(keyValuePair);
        }
        return entryHeader;
    }

    private boolean isUpdate(List<Column> columnList, int index, String newValue) {
        if (columnList == null) {
            throw new ParseException("ERROR ## the bfColumns is null");
        }

        if (index < 0) {
            return false;
        }

        for (Column column : columnList) {
            if (column.getIndex() == index) {// 比较before / after的column binlog_event_position_manager
                if (column.isNull() && newValue == null) {
                    // 如果全是null
                    return false;
                } else if (newValue != null && (!column.isNull() && column.getValue().equals(newValue))) {
                    // fixed issue #135, old column is Null
                    // 如果不为null，并且相等
                    return false;
                }
            }
        }

        // 比如nolob/minial模式下,可能找不到before记录,认为是有变化
        return true;
    }

    private TableMetaData getTableMetaData(String schemaName, String tableName, boolean useCache, EntryPosition entryPosition) {
        try {
            return tableMetaDataCache.getTableMetaData(schemaName, tableName, useCache, entryPosition);
        } catch (Throwable e) {
            String message = ExceptionUtils.getRootCauseMessage(e);
            if (filterTableError) {
                if (StringUtils.contains(message, "errorNumber=1146") && StringUtils.contains(message, "doesn't exist")) {
                    return null;
                } else if (StringUtils.contains(message, "errorNumber=1142") && StringUtils.contains(message, "command denied")) {
                    return null;
                }
            }

            throw new ParseException(e);
        }
    }

    private boolean isText(String columnType) {
        return "LONGTEXT".equalsIgnoreCase(columnType) || "MEDIUMTEXT".equalsIgnoreCase(columnType)
                || "TEXT".equalsIgnoreCase(columnType) || "TINYTEXT".equalsIgnoreCase(columnType);
    }

    private boolean isAliSQLHeartBeat(String schema, String table) {
        return "test".equalsIgnoreCase(schema) && "heartbeat".equalsIgnoreCase(table);
    }

    private boolean isRDSHeartBeat(String schema, String table) {
        return "mysql".equalsIgnoreCase(schema) && "ha_health_check".equalsIgnoreCase(table);
    }


}
